Skip to content

[SPARK-13636][SQL] Directly consume UnsafeRow in wholestage codegen plans#11484

Closed
viirya wants to merge 7 commits into
apache:masterfrom
viirya:direct-consume-unsaferow
Closed

[SPARK-13636][SQL] Directly consume UnsafeRow in wholestage codegen plans#11484
viirya wants to merge 7 commits into
apache:masterfrom
viirya:direct-consume-unsaferow

Conversation

@viirya

@viirya viirya commented Mar 3, 2016

Copy link
Copy Markdown
Member

JIRA: https://issues.apache.org/jira/browse/SPARK-13636

What changes were proposed in this pull request?

As shown in the wholestage codegen verion of Sort operator, when Sort is top of Exchange (or other operator that produce UnsafeRow), we will create variables from UnsafeRow, than create another UnsafeRow using these variables. We should avoid the unnecessary unpack and pack variables from UnsafeRows.

How was this patch tested?

All existing wholestage codegen tests should be passed.

@viirya

viirya commented Mar 3, 2016

Copy link
Copy Markdown
Member Author

Before this patch, the generated codes for the Sort operator in the plan val df = sqlContext.range(3, 0, -1).sort(col("id")):

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [id#22L ASC], true, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean sort_needToSort;
/* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */   private scala.collection.Iterator inputadapter_input;
/* 017 */   private UnsafeRow sort_result;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sort_holder;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter sort_rowWriter;
/* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 021 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 022 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 023 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 024 */
/* 025 */   public GeneratedIterator(Object[] references) {
/* 026 */     this.references = references;
/* 027 */   }
/* 028 */
/* 029 */   public void init(scala.collection.Iterator inputs[]) {
/* 030 */     sort_needToSort = true;
/* 031 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 032 */     sort_sorter = sort_plan.createSorter();
/* 033 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 034 */
/* 035 */     inputadapter_input = inputs[0];
/* 036 */     sort_result = new UnsafeRow(1);
/* 037 */     this.sort_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sort_result, 0)
;
/* 038 */     this.sort_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sort_hold
er, 1);
/* 039 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 040 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValu
e();
/* 041 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 042 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localVa
lue();
/* 043 */   }
/* 044 */
/* 045 */   private void sort_addToSorter() throws java.io.IOException {
/* 046 */     while (inputadapter_input.hasNext()) {
/* 047 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 048 */       /* input[0, bigint] */
/* 049 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 050 */       long inputadapter_value = inputadapter_isNull ? -1L : (inputadapter_row.getLong(0));
/* 051 */       // Convert the input attributes to an UnsafeRow and add it to the sorter
/* 052 */
/* 053 */       sort_rowWriter.zeroOutNullBytes();
/* 054 */
/* 055 */       if (inputadapter_isNull) {
/* 056 */         sort_rowWriter.setNullAt(0);
/* 057 */       } else {
/* 058 */         sort_rowWriter.write(0, inputadapter_value);
/* 059 */       }
/* 060 */
/* 061 */       sort_sorter.insertRow(sort_result);
/* 062 */       if (shouldStop()) {
/* 063 */         return;
/* 064 */       }
/* 065 */     }
/* 066 */
/* 067 */   }
/* 068 */
/* 069 */   protected void processNext() throws java.io.IOException {
/* 070 */     if (sort_needToSort) {
/* 071 */       sort_addToSorter();
/* 072 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 073 */       sort_sortedIter = sort_sorter.sort();
/* 074 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 075 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 076 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 077 */       sort_needToSort = false;
/* 078 */     }
/* 079 */
/* 080 */     while (sort_sortedIter.hasNext()) {
/* 081 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 082 */       append(sort_outputRow.copy());
/* 083 */       if (shouldStop()) return;
/* 084 */     }
/* 085 */   }
/* 086 */ }

@viirya

viirya commented Mar 3, 2016

Copy link
Copy Markdown
Member Author

After this patch, the generated codes:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [id#22L ASC], true, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean sort_needToSort;
/* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */   private scala.collection.Iterator inputadapter_input;
/* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 021 */
/* 022 */   public GeneratedIterator(Object[] references) {
/* 023 */     this.references = references;
/* 024 */   }
/* 025 */
/* 026 */   public void init(scala.collection.Iterator inputs[]) {
/* 027 */     sort_needToSort = true;
/* 028 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 029 */     sort_sorter = sort_plan.createSorter();
/* 030 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 031 */
/* 032 */     inputadapter_input = inputs[0];
/* 033 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 034 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValu
e();
/* 035 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 036 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localVa
lue();
/* 037 */   }
/* 038 */
/* 039 */   private void sort_addToSorter() throws java.io.IOException {
/* 040 */     while (inputadapter_input.hasNext()) {
/* 041 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 042 */
/* 043 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row.copy());
/* 044 */       if (shouldStop()) {
/* 045 */         return;
/* 046 */       }
/* 047 */     }
/* 048 */
/* 049 */   }
/* 050 */
/* 051 */   protected void processNext() throws java.io.IOException {
/* 052 */     if (sort_needToSort) {
/* 053 */       sort_addToSorter();
/* 054 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 055 */       sort_sortedIter = sort_sorter.sort();
/* 056 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 057 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 058 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 059 */       sort_needToSort = false;
/* 060 */     }
/* 061 */
/* 062 */     while (sort_sortedIter.hasNext()) {
/* 063 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 064 */       append(sort_outputRow.copy());
/* 065 */       if (shouldStop()) return;
/* 066 */     }
/* 067 */   }
/* 068 */ }

@viirya

viirya commented Mar 3, 2016

Copy link
Copy Markdown
Member Author

You can find that in the method sort_addToSorter, the codes generated by this patch doesn't need to unpack fields from UnsafeRows and re-pack them back to another UnsafeRow in order to insert into the row sorter.

@viirya viirya changed the title [SPARK-13636][SQL] Direct consume UnsafeRow in wholestage codegen plans [SPARK-13636][SQL] Directly consume UnsafeRow in wholestage codegen plans Mar 3, 2016
@SparkQA

SparkQA commented Mar 3, 2016

Copy link
Copy Markdown

Test build #52367 has finished for PR 11484 at commit 6941eb1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya

viirya commented Mar 3, 2016

Copy link
Copy Markdown
Member Author

retest this please.

@SparkQA

SparkQA commented Mar 3, 2016

Copy link
Copy Markdown

Test build #52377 has finished for PR 11484 at commit 6941eb1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya

viirya commented Mar 4, 2016

Copy link
Copy Markdown
Member Author

cc @davies @rxin @nongli

protected var parent: CodegenSupport = null

/**
* Whether this SparkPlan accepts UnsafeRow as input in consumeChild.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumeChild -> doConsume

@davies

davies commented Mar 4, 2016

Copy link
Copy Markdown
Contributor

@viirya Can we wait for #11274 , then we could avoid some complicity.

@viirya

viirya commented Mar 4, 2016

Copy link
Copy Markdown
Member Author

@davies Yea. That will be good.

@kiszk

kiszk commented Mar 4, 2016

Copy link
Copy Markdown
Member

Is it better to add "in sort" in a title of this PR?

@viirya

viirya commented Mar 4, 2016

Copy link
Copy Markdown
Member Author

@kiszk this is not just for Sort operator. I just take Sort operator as an example.

@kiszk

kiszk commented Mar 4, 2016

Copy link
Copy Markdown
Member

@viirya thank you for your explanation. I understood that this PR supports sort and operations regarding whole stage code generation

viirya added 4 commits March 9, 2016 14:37
…saferow

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@SparkQA

SparkQA commented Mar 9, 2016

Copy link
Copy Markdown

Test build #52733 has finished for PR 11484 at commit dea644a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA

SparkQA commented Mar 9, 2016

Copy link
Copy Markdown

Test build #52732 has finished for PR 11484 at commit 6400eb2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya

viirya commented Mar 9, 2016

Copy link
Copy Markdown
Member Author

@davies I've addressed your comments. I also made corresponding changes for #11274. Please see if this change is good now. Thanks!

}
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
if (row != null) {
s"$sorterVariable.insertRow((UnsafeRow)$row.copy());"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the copy here? I think the sorter will copy it by itself.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I checked it. Will remove this copy call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants